package netService

import (
	"gitee.com/ling-bin/go-utils/timingwheel"
	"gitee.com/ling-bin/network/netInterface"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

// UdpConnection 连接结构体
type UdpConnection struct {
	server           *UdpService        //当前conn属于哪个server，在conn初始化的时候添加即可
	conn             *net.UDPConn       //当前连接的socket 套接字
	connId           uint64             //当前连接的ID 也可以称作为SessionID，ID全局唯一
	session          interface{}        //标志(用于给业务层使用)
	remoteAddr       net.Addr           //客户端地址
	isClosed         bool               //当前连接的关闭状态 [ true:关闭，false:开 ]
	recPackCount     uint64             //上行当前处理的包总数（处理前，1开始）
	recTotalByteSize uint64             //上行总大小(字节)
	repPackCount     uint64             //下行当前处理的包总数（处理后）
	repTotalByteSize uint64             //下行成功总大小(字节)
	repPackErrCount  uint64             //下发异常包个数
	incr             int64              //供业务下发使用流水号
	heartTime        time.Time          //连接最后一次接收数据时间(每包更新)
	sendTime         time.Time          //连接最后一次发送数据时间(每包更新)
	startTime        time.Time          //连接建立时间
	property         sync.Map           //链接属性
	connLock         sync.RWMutex       //保护链接属性修改的锁
	iTime            *timingwheel.Timer //对应时间轮任务
}

// NewUdpConnection Udp连接
func NewUdpConnection(server *UdpService, conn *net.UDPConn, connId uint64, remoteAddr net.Addr) *UdpConnection {
	uConn := &UdpConnection{
		server:     server,
		conn:       conn,
		connId:     connId,
		isClosed:   true,
		remoteAddr: remoteAddr,
		heartTime:  time.Now(),
		sendTime:   time.Now(),
		startTime:  time.Now(),
		property:   sync.Map{}, //make(map[string]interface{}),
	}
	//添加连接
	uConn.server.GetConnMgr().Add(uConn)
	uConn.addDetection()
	return uConn
}

// GetSession 获取session
func (u *UdpConnection) GetSession() interface{} {
	return u.session
}

// SetSession 设置session
func (u *UdpConnection) SetSession(session interface{}) {
	u.session = session
}

// addDetection 添加监测
func (u *UdpConnection) addDetection() {
	expTime := u.GetHeartTime().UTC().Add(u.server.config.KeepTime)
	connId := u.GetConnId()
	u.iTime = timingWhee.AfterTimeFunc(expTime, func() {
		u.clear(connId, expTime)
	})
}

// clear 清理任务
func (u *UdpConnection) clear(connId uint64, odExpTime time.Time) {
	expTime := u.GetHeartTime().UTC().Add(u.server.config.KeepTime)
	if expTime.Sub(odExpTime).Seconds() >= 2 {
		u.iTime = timingWhee.AfterTimeFunc(expTime, func() {
			u.clear(connId, expTime)
		})
		return
	}
	u.Stop()
}

// GetNetConn 获取网络连接
func (u *UdpConnection) GetNetConn() interface{} {
	return u.conn
}

// GetNetwork 获取网络类型
func (u *UdpConnection) GetNetwork() string {
	return u.server.config.Network
}

// GetConnId 获取当前连接ID
func (u *UdpConnection) GetConnId() uint64 {
	return u.connId
}

// GetRemoteAddr 获取远程客户端地址信息
func (u *UdpConnection) GetRemoteAddr() net.Addr {
	return u.remoteAddr
}

// GetLocalAddr 获取本地地址信息
func (u *UdpConnection) GetLocalAddr() net.Addr {
	return u.conn.LocalAddr()
}

// CallLogHandle 调用异常处理
func (u *UdpConnection) CallLogHandle(level netInterface.ErrLevel, msgAry ...interface{}) {
	u.server.CallLogHandle(level, msgAry)
}

// GetIsClosed 获取的状态[脏读]（ture:关闭状态，false:未关闭）
func (u *UdpConnection) GetIsClosed() bool {
	return u.isClosed
}

// Incr 连接提供给业务作为流水号使用,循环累加,(val 为正数为递增，val为负数为递减,val为0则获取值)
func (u *UdpConnection) Incr(val int64) int64 {
	return atomic.AddInt64(&u.incr, val)
}

// Start 启动连接，让当前连接开始工作[只会调用一次]
func (u *UdpConnection) Start() {
	if u.isClosed {
		u.isClosed = false
		u.server.CallOnConnStart(u)
	}
}

// Stop 停止连接，结束当前连接状态
func (u *UdpConnection) Stop() {
	if u.isClosed {
		return
	}
	u.connLock.Lock()
	if u.isClosed {
		u.connLock.Unlock()
		return
	}
	u.isClosed = true
	u.connLock.Unlock()
	if u.iTime != nil {
		u.iTime.Stop()
		u.iTime = nil
	}

	//发出关闭通知
	u.server.CallOnConnStop(u)

	//将链接从连接管理器中删除
	u.server.GetConnMgr().Remove(u)
}

// SendData 发送数据给远程的TCP客户端
// Data     下发数据
// CmdCode  指令标识[如: rep 普通回复, cmd 用户操作下发 。。]
func (u *UdpConnection) SendData(data []byte, cmdCode string) error {
	return u.SendDataCall(data, cmdCode, nil, nil)
}

// SendDataCall 发送数据给远程的UDP客户端(带参数和回调)
// Data     下发数据
// Param    下发需要回调携带参数
// CmdCode  指令标识[如: rep 普通回复, cmd 用户操作下发 。。]
// CallFunc 下发后回调函数
func (u *UdpConnection) SendDataCall(data []byte, cmdCode string, param interface{}, callFunc func(netInterface.IConnection, []byte, bool, string, interface{}, error)) error {
	if u.isClosed {
		return netInterface.ClosedErr
	}
	//更新发送时间
	u.sendTime = time.Now()

	//对象池获取
	response := newReplyTask()
	response.ConnId = u.connId
	response.Data = data
	response.CallFunc = callFunc
	response.CmdCode = cmdCode
	response.Param = param
	response.RunReplyTask = u.runReplyTask

	if !u.server.config.OverflowDiscard {
		u.server.replyHandle.SendToTaskQueueWait(response)
		return nil
	}
	err := u.server.replyHandle.SendToTaskQueue(response)
	if err != nil {
		u.server.CallLogHandle(netInterface.Warn, "发送队列已满", err)
	}
	return err
}

// runReplyTask 运行回复任务
func (u *UdpConnection) runReplyTask(replyTask *replyTask) {
	defer func() {
		if r := recover(); r != nil {
			u.CallLogHandle(netInterface.Error, "[udp]运行回复任务异常:", r)
		}
	}()
	count := 0
	var tempDelay time.Duration
	sendData := replyTask.Data
RETRY: //遇到临时错误重试发送
	//设置发送超时时间
	err := u.conn.SetWriteDeadline(time.Now().Add(u.server.config.SendOutTime))
	if err != nil {
		u.replyNotice(replyTask, false, err)
		return
	}
	//发送数据[有异常信息或者发送成功数据和要发的数据不一致]
	if _, err := u.conn.WriteTo(sendData, u.remoteAddr); err != nil {
		count++
		if ne, ok := err.(net.Error); ok && ne.Temporary() {
			if count <= u.server.config.SendRetryCount {
				if count == 0 {
					tempDelay = 1 * time.Microsecond
				} else {
					tempDelay *= 2
				}
				if max := 20 * time.Millisecond; tempDelay > max {
					tempDelay = max
				}
				time.Sleep(tempDelay)
				goto RETRY //临时错误重试
			}
		}
		u.replyNotice(replyTask, false, err)
		return
	}
	u.replyNotice(replyTask, true, nil)
}

// replyNotice 数据发送后通知
func (u *UdpConnection) replyNotice(replyTask *replyTask, isOk bool, err error) {
	u.sendTime = time.Now()
	if replyTask.CallFunc != nil {
		replyTask.CallFunc(u, replyTask.Data, isOk, replyTask.CmdCode, replyTask.Param, err)
	}
	if isOk {
		u.repTotalByteSize += uint64(len(replyTask.Data))
		u.repPackCount++
	} else {
		u.repPackErrCount++
	}
	u.server.CallOnReply(u, replyTask.Data, isOk, replyTask.CmdCode, replyTask.Param, err)
}

func (u *UdpConnection) sendCallFunc(connection netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) {
	if isOk {
		//成功记录发送次数，发送字节数
		u.repPackCount++
		u.repTotalByteSize += uint64(len(data))
	} else {
		u.repPackErrCount++
	}
	u.server.CallOnReply(connection, data, isOk, cmdCode, param, err)
}

// SetProperty 设置链接属性
func (u *UdpConnection) SetProperty(key string, value interface{}) {
	u.property.Store(key, value)
}

// GetProperty 获取链接属性
func (u *UdpConnection) GetProperty(key string) (interface{}, error) {
	if value, ok := u.property.Load(key); ok {
		return value, nil
	} else {
		return nil, netInterface.NotKey
	}
}

// RemoveProperty 移除链接属性
func (u *UdpConnection) RemoveProperty(key string) {
	u.property.Delete(key)
}

// GetPropertyKeys 获取所有属性key
func (u *UdpConnection) GetPropertyKeys() []string {
	propertyAry := make([]string, 0, 4)
	u.property.Range(func(key, value interface{}) bool {
		propertyAry = append(propertyAry, key.(string))
		return true
	})
	return propertyAry
}

// GetRecInfo 上行当前处理的包总数（处理前，1开始），总大小(字节)
func (u *UdpConnection) GetRecInfo() (count, byteSize uint64) {
	return u.recPackCount, u.recTotalByteSize
}

// GetRepInfo 下行当前处理的包总数（处理后），总大小(字节)
func (u *UdpConnection) GetRepInfo() (count, byteSize, errCount uint64) {
	return u.repPackCount, u.repTotalByteSize, u.repPackErrCount
}

// GetHeartTime 连接最后一次接收数据时间
func (u *UdpConnection) GetHeartTime() time.Time {
	return u.heartTime
}

// GetSendTime  连接最后一次发送数据时间
func (u *UdpConnection) GetSendTime() time.Time {
	return u.sendTime
}

// GetStartTime 连接建立时间
func (u *UdpConnection) GetStartTime() time.Time {
	return u.startTime
}

// onCompleted Udp 不需要分包
func (u *UdpConnection) onCompleted(receive *receiveUdpTask) {
	defer func() {
		if r := recover(); r != nil {
			u.CallLogHandle(netInterface.Error, "[udp]数据处理异常:", r)
		}
	}()

	var hData []byte
	if u.server.config.HDataCache {
		hData = receive.Data[:receive.Count]
	} else {
		hData = make([]byte, receive.Count)
		copy(hData, receive.Data[:receive.Count])
	}
	u.OnReceive(hData)
}

// OnReceive 数据上传完整的一包处理
func (u *UdpConnection) OnReceive(data []byte) {
	//添加包个数
	u.recPackCount++
	//添加处理字节数
	u.recTotalByteSize += uint64(len(data))
	//更新心跳时间
	u.heartTime = time.Now()
	u.server.CallOnReceive(u, data)
}
